package com.atguigu.gmall.realtime.app.dwm;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.MyKafka;
import com.atguigu.gmall.realtime.utils.MyKafkaPro;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.text.SimpleDateFormat;

//独立访客计算，也就是日活统计
public class UniqueVisitApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        //设置checkpoint的的配置
        //设置没5s保存一次checkpoint,精准一次消费
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        //checkpoint必须在一分钟内完成，否则舍弃,检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(6000);
        //checkpoint保存的位置，及其操作用户
        env.setStateBackend(new FsStateBackend("hdfs://hadoop104:8020/gmall/flink/checkpoint"));
        System.setProperty("HADOOP_USER_NAME","atguigu");
        //重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));


        //消费者主题
        String topic="dwd_topic_page";
        //消费者组
        String groupId="UniqueVisitApp";




        //TODO 1 ods_base_log作为source输入源
        //TODO 1.1 连接kafka,封装一个kafka的工具类,获取连接.
        FlinkKafkaConsumer<String> kafkaSource = MyKafka.getFlinkKafkaConsumer(topic,groupId);
        //通过kafkaSource读取数据，封装成一个流
        DataStreamSource<String> kafkaDStream = env.addSource(kafkaSource);

        //kafkaDStream.print(">>>>");







        //流测试，查看是否话题能被消费.
        //kafkaDStream.print("消费测试从dwd_page主题-!!!!!!!!!!!!!!!!!!!!!!!!-page");

        //从dwd_topic_page主题读取到的数据格式.

/*        {
            "common": {
            "ar": "440000",
                    "uid": "39",
                    "os": "iOS 13.3.1",
                    "ch": "Appstore",
                    "is_new": "0",
                    "md": "iPhone X",
                    "mid": "mid_9",
                    "vc": "v2.1.134",
                    "ba": "iPhone"
        },
            "page": {
            "page_id": "good_list",
                    "item": "电视",
                    "during_time": 8235,
                    "item_type": "keyword",
                    "last_page_id": "home"
        },
            "displays": [
            {
                "display_type": "promotion",
                    "item": "1",
                    "item_type": "sku_id",
                    "pos_id": 4,
                    "order": 1
            },
            {
                "display_type": "recommend",
                    "item": "5",
                    "item_type": "sku_id",
                    "pos_id": 4,
                    "order": 2
            },
            {
                "display_type": "query",
                    "item": "4",
                    "item_type": "sku_id",
                    "pos_id": 2,
                    "order": 3
            },
            {
                "display_type": "query",
                    "item": "7",
                    "item_type": "sku_id",
                    "pos_id": 2,
                    "order": 4
            },
            {
                "display_type": "promotion",
                    "item": "8",
                    "item_type": "sku_id",
                    "pos_id": 5,
                    "order": 5
            },
            {
                "display_type": "promotion",
                    "item": "2",
                    "item_type": "sku_id",
                    "pos_id": 3,
                    "order": 6
            },
            {
                "display_type": "query",
                    "item": "1",
                    "item_type": "sku_id",
                    "pos_id": 3,
                    "order": 7
            },
            {
                "display_type": "promotion",
                    "item": "9",
                    "item_type": "sku_id",
                    "pos_id": 2,
                    "order": 8
            },
            {
                "display_type": "query",
                    "item": "1",
                    "item_type": "sku_id",
                    "pos_id": 2,
                    "order": 9
            }
],
            "ts": 1618015293000
        }*/

        //TODO 2 计算日活,将日活的数据写入dwm_unique_visit,
        //2.1 将传入的 JsonStr先转变为jsonObj类型
        SingleOutputStreamOperator<JSONObject> JsonObjectDStream = kafkaDStream.map(r -> JSONObject.parseObject(r));

       /* JsonObjectDStream.map(
                new MapFunction<JSONObject, JSONObject>() {
                    @Override
                    public JSONObject map(JSONObject value) throws Exception {
                        String string = value.getJSONObject("common").getString("is_new");
                        if(string.equals("1")){
                            System.out.println("成功");
                        }
                        return value;
                    }
                }
        ).print(">>打印");*/


        //2.2通过keyby将用户mid分组,
        KeyedStream<JSONObject, String> KeyedJsonObjDStream = JsonObjectDStream.keyBy(r -> r.getJSONObject("common").getString("mid"));

        //KeyedJsonObjDStream.print("kedby>>>>>");

        //2.3对分组的数据进行过滤。日活条件为存在不存在last_pageId,所以当天有last_pageId的数据将会被过滤
        SingleOutputStreamOperator<JSONObject> filterDStream = KeyedJsonObjDStream.filter(
                //将last_pageId存入状态中，来保存
                new RichFilterFunction<JSONObject>() {
                    //定义一个保存状态
                    private ValueState<String> lastVisitDataState;
                    //定时一个时间格式化器
                    private SimpleDateFormat sdf;
                    //统计的是日活，状态不需要放内存，设置状态存活时间


                    @Override
                    public void open(Configuration parameters) throws Exception {
                        sdf = new SimpleDateFormat("yyyyMMdd");
                        //设置一个状态描述器
                        ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("ValueStateDescriptor", String.class);

                        //设置状态过期ttl time to leave 配置，23设计模式的构造者模式
                        //?为什么要状态改变？
                        StateTtlConfig stateTtlConf = StateTtlConfig
                                //存活时间为1天
                                .newBuilder(Time.days(1L))
                                //如果状态改变，同时存活时间也会跟着一起改变
                                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                                //如果状态已经失效了，则进行清理,不予返回
                                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                                .build();

                        //将配置写入状态描述器中
                        valueStateDescriptor.enableTimeToLive(stateTtlConf);


                        //获取状态
                        lastVisitDataState = getRuntimeContext().getState(valueStateDescriptor);
                    }

                    //过滤lastpageId
                    @Override
                    public boolean filter(JSONObject jsonObj) throws Exception {
                        //获取当前页面的lastPageId.
                        String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
                        //只有从第一次进来的起始页面来进行判断.所以及那个不是起始页面都给滤掉.
                        if (lastPageId != null && lastPageId.length() > 0) {
                            return false;
                        }
                        //如果是起始页面，也就是lastpageId=null，情况下，看是否来过，若是来过一次，便将信息存入状态，若是
                        //再次访问的话，就不算日活了，只计算一次.

                        //从状态中获取上次访问日期
                        String lastVisitDate = lastVisitDataState.value();

                        //获取当日日期，也就是当天的日期 eg:ts-1618015293000 ->2021-4-21 18:49:28
                        //可以作为第一次来的当天时间将其存入状态中.
                        Long ts = jsonObj.getLong("ts");
                        //转换时间格式
                        String curDate = sdf.format(ts);


                        //如果状态里有日期数据，说明之前已经来过一次了.滤掉
                        if (lastVisitDate != null && lastVisitDate.length() > 0) {
                            return false;
                        } else {
                            //现在则是第一次过来的访问客户，也就是现在状态里没有这个时间.将现在的时间存入状态中
                            lastVisitDataState.update(curDate);
                            return true;
                        }


                    }
                }
        );

        //将过滤后的数据写入kafka中，这些数据就是日活数据.先要将JSONObject对象转换为Str
        SingleOutputStreamOperator<String> jsonObjDStream = filterDStream.map(r -> r.toJSONString());

        jsonObjDStream.print(">>>>>日活数据.");
        //将数据写入kafka
        jsonObjDStream.addSink(MyKafkaPro.getFlinkKafkaProducer("dwm_unique_visit"));


        env.execute();
    }
}
